//  Copyright 2022 AntGroup CO., Ltd.
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

#pragma once

#include "fma-common/file_system.h"
#include "fma-common/fma_stream.h"
#include "fma-common/text_parser.h"

namespace fma_common {
class RotatingFileNameGenerator {
 public:
    virtual ~RotatingFileNameGenerator() {}
    virtual std::string GetNextFileName() = 0;
    virtual bool NameCompareLess(const std::string& lhs, const std::string& rhs) const = 0;
    virtual bool IsMyFile(const std::string& fname) const = 0;

    // check each file name (not path, just file name) and init state of this generator
    // returns the file names generated by this generator
    virtual std::vector<std::string> CheckFileNamesAndInit(
        const std::vector<std::string>& existing_files) = 0;

    // decide number of files to delete, assuming the list of input files are already sorted
    virtual bool ShouldDelete(const std::string& files) const = 0;
};

class IndexedFileNameGenerator : public RotatingFileNameGenerator {
    std::string prefix_;
    int64_t curr_fid_ = 0;

    bool TryParseFileName(const std::string& fname, int64_t& idx) const {
        if (fname.size() < prefix_.size()) return false;
        if (memcmp(prefix_.data(), fname.data(), prefix_.size()) != 0) return false;
        const char* beg = fname.data() + prefix_.size();
        const char* end = fname.data() + fname.size();
        if (TextParserUtils::ParseInt64(beg, end, idx) != (size_t)(end - beg)) return false;
        return true;
    }

 public:
    explicit IndexedFileNameGenerator(const std::string& prefix) : prefix_(prefix), curr_fid_(0) {}

    std::string GetNextFileName() override { return prefix_ + std::to_string(curr_fid_++); }

    bool NameCompareLess(const std::string& lhs, const std::string& rhs) const override {
        int64_t a = 0;
        int64_t b = 0;
        TryParseFileName(lhs, a);
        TryParseFileName(lhs, a);
        return a < b;
    }

    bool IsMyFile(const std::string& fname) const override {
        int64_t idx;
        return TryParseFileName(fname, idx);
    }

    std::vector<std::string> CheckFileNamesAndInit(
        const std::vector<std::string>& existing_files) override {
        std::vector<std::string> ret;
        curr_fid_ = -1;
        for (auto& f : existing_files) {
            int64_t id = 0;
            if (!TryParseFileName(f, id)) continue;
            curr_fid_ = std::max(curr_fid_, id);
            ret.push_back(f);
        }
        curr_fid_++;
        return ret;
    }

    bool ShouldDelete(const std::string& files) const override { return false; }
};

class RotatingFiles {
 protected:
    std::mutex mu_;
    std::string dir_;
    FileSystem& fs_;
    std::unique_ptr<RotatingFileNameGenerator> fn_gen_;
    size_t max_file_size_ = (size_t)1 << 30;

    std::deque<std::string> file_paths_;
    OutputFmaStream curr_file_;

 public:
    RotatingFiles(const std::string& dir,
                  std::unique_ptr<RotatingFileNameGenerator>&& file_name_generator,
                  size_t max_file_size)
        : dir_(dir),
          fs_(FileSystem::GetFileSystem(dir_)),
          fn_gen_(std::move(file_name_generator)),
          max_file_size_(max_file_size) {
        if (!fs_.IsDir(dir_) && !fs_.Mkdir(dir_))
            throw std::runtime_error("Failed to create dir " + dir_);
        // get all file names under that dir
        auto& fs = fma_common::FileSystem::GetFileSystem(dir);
        std::vector<std::string> files = fs.ListFiles(dir, nullptr, false);
        files = fn_gen_->CheckFileNamesAndInit(files);
        // sort files
        std::sort(files.begin(), files.end(),
                  [this](const std::string& lhs, const std::string& rhs) {
                      return fn_gen_->NameCompareLess(lhs, rhs);
                  });
        for (auto& f : files) file_paths_.push_back(GetFullPath(f));
        OpenNewFileNoLock();
    }

    // list sorted file paths
    std::vector<std::string> ListFiles() {
        std::lock_guard<std::mutex> l(mu_);
        curr_file_.Flush();
        return std::vector<std::string>(file_paths_.begin(), file_paths_.end());
    }

    // write out a message
    void Write(const void* buf, size_t size, bool flush) {
        std::lock_guard<std::mutex> l(mu_);
        curr_file_.Write(buf, size);
        if (flush) curr_file_.Flush();
        if (curr_file_.Size() >= max_file_size_) {
            curr_file_.Close();
            OpenNewFileNoLock();
            DeleteOldFilesIfNecessaryNoLock();
        }
    }

    void Write(const std::string& msg, bool flush = false) { Write(msg.data(), msg.size(), flush); }

    // flush
    void Flush() {
        std::lock_guard<std::mutex> l(mu_);
        curr_file_.Flush();
    }

    // delete all exsiting files
    void DeleteExistingFiles() {
        std::lock_guard<std::mutex> l(mu_);
        curr_file_.Close();
        auto& fs = FileSystem::GetFileSystem(dir_);
        for (auto& f : file_paths_) {
            fs.Remove(f);
        }
        file_paths_.clear();
        OpenNewFileNoLock();
    }

 protected:
    std::string GetFullPath(const std::string& name) const {
        return dir_ + fs_.PathSeparater() + name;
    }

    void OpenNewFileNoLock() {
        std::string name = fn_gen_->GetNextFileName();
        std::string path = GetFullPath(name);
        curr_file_.Open(path, 65536);
        if (!curr_file_.Good()) throw std::runtime_error("Failed to open file " + path);
        file_paths_.push_back(path);
    }

    virtual void DeleteOldFilesIfNecessaryNoLock() {}
};
}  // namespace fma_common
